package org.zjvis.datascience.common.algo;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zjvis.datascience.common.constant.SqlTemplate;
import org.zjvis.datascience.common.enums.AlgEnum;
import org.zjvis.datascience.common.enums.SubTypeEnum;
import org.zjvis.datascience.common.sql.SqlHelper;
import org.zjvis.datascience.common.util.ToolUtil;
import org.zjvis.datascience.common.vo.TaskVO;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * @description Kmeans 聚类算子模板类 [已废弃]
 * @date 2021-12-24
 */
@Deprecated
public class KmeansAlg extends BaseAlg {

    private final static Logger logger = LoggerFactory.getLogger("KmeansAlg");


    private final String TPL_FILENAME = "template/algo/kmeans.json";


    private static String KMEANS_SQL_MADLIB = "select * from  \"%s\".\"kmeans_pp\"('%s', '%s', '%s', '%s', '%s', %d, '%s', '%s', %d, %f)";

    private static String KMEANS_SQL_MADLIB_SAMPLE = "select * from  \"%s\".\"kmeans_complex\"('CREATE VIEW %s AS SELECT * from %s where \"%s\" <= %s', '%s', '%s', '%s', '%s', %d, '%s', '%s', %d, %f)";

    private static String KMEANS_SQL_SPARK = "k-Means -s %s -k %d -f %s -m %d -t %s -uk %d -idcol %s";


    public KmeansAlg() {
        super(AlgEnum.KMEANS.name(), SubTypeEnum.CLUSTER.getVal(), SubTypeEnum.CLUSTER.getDesc());
        this.maxParentNumber = 1;
    }

    /**
     * eg: select * from kmeans('dataset.wyz_testdata','dataset.kmeans_0823_model',
     * 'dataset.kmeans_0823_result', 'name', 'a1,a2,a3', 3, 'madlib.squared_dist_norm2',
     * 'madlib.avg', 10, 0.001)
     *
     * @return
     */
    public String getKmeansSql(String sourceTable, String outTable,
                               String modelTable, String idCol, String featureCols, int k, String fnFun,
                               String aggFun, int maxIter, float minFrac, long timeStamp, String sampleTable) {
        if (StringUtils.isNotEmpty(sampleTable)) {
            List<String> fields = new ArrayList<>();
            fields.add(String.format("\"%s\"", idCol));
            String[] tmps = featureCols.split(",");
            for (String item : tmps) {
                fields.add(String.format("\"%s\"", item));
            }
            return String
                    .format(KMEANS_SQL_MADLIB_SAMPLE, SqlTemplate.SCHEMA, sampleTable, sourceTable,
                            idCol, SAMPLE_NUMBER, modelTable, outTable, idCol, featureCols, k, fnFun,
                            aggFun, maxIter, minFrac);
        } else {
            // 全量, 根据配置
            if (getEngine().isMadlib()) {
                return String.format(KMEANS_SQL_MADLIB, SqlTemplate.SCHEMA, sourceTable, modelTable,
                        outTable, idCol, featureCols, k, fnFun, aggFun, maxIter, minFrac);
            } else if (getEngine().isSpark()) {
                return String
                        .format(KMEANS_SQL_SPARK, sourceTable, k, featureCols, maxIter, outTable,
                                timeStamp, idCol);
            }
        }
        return StringUtils.EMPTY;
    }

    public void initTemplate(JSONObject data) {
        JSONArray jsonArray = getTemplateParamList(TPL_FILENAME);
        data.put("setParams", jsonArray);
        baseInitTemplate(data);
        JSONArray validate = new JSONArray();
        validate.add("feature_cols,number");
        data.put("validate", validate);
    }

    public String initSql(JSONObject json, List<SqlHelper> sqlHelpers, long timeStamp,
                          String engineName) {
        this.engineName = engineName;
        String sourceTable = json.getString("source_table");
        sourceTable = ToolUtil.alignTableName(sourceTable, timeStamp);
        String outTable = json.getString("out_table_rename");
        outTable = ToolUtil.alignTableName(outTable, timeStamp);
        String modelTable = json.getString("model_table_rename");
        modelTable = ToolUtil.alignTableName(modelTable, timeStamp);

        JSONArray features = json.getJSONArray("feature_cols");
        String featureCols = this.getFeatureColsStr(features);
        if (StringUtils.isEmpty(featureCols)) {
            return StringUtils.EMPTY;
        }
        int k = json.getInteger("k");
        String fnFun = json.getString("fn_dist");
        String aggFun = json.getString("agg_centroid");
        int maxIter = json.getInteger("max_iter");
        float minFrac = json.getFloat("min_frac");

        String sampleTable = "";
        System.out.println("->   ->" + json.toJSONString());
        if (!json.containsKey("isSample") ||
                json.getString("isSample").equals("SUCCESS") ||
                json.getString("isSample").equals("FAIL")) {
            sampleTable = outTable.replace("solid_", "view_");
            json.put("isSample", "CREATE");
            System.out.println("-> 111111111111111111111111");
        } else {
            System.out.println("-> 22222222222222222222");
        }

        return getKmeansSql(sourceTable, outTable, modelTable, ID_COL,
                featureCols, k, fnFun, aggFun, maxIter, minFrac, timeStamp, sampleTable);
    }

    public void defineOutput(TaskVO vo) {
        JSONObject jsonObject = vo.getData();
        String outTablePrefix = jsonObject.getString("out_table");
        String tableName = String
                .format(SqlTemplate.OUT_TABLE_NAME, outTablePrefix, vo.getPipelineId(), vo.getId());
        jsonObject.put("out_table_rename", tableName);
        String modelTable = jsonObject.getString("model_table");
        modelTable = String
                .format(SqlTemplate.OUT_TABLE_NAME, modelTable, vo.getPipelineId(), vo.getId());
        jsonObject.put("model_table_rename", modelTable);
        JSONArray input = jsonObject.getJSONArray("input");
        // String idColType = "";
        JSONArray resultOutputTypes = new JSONArray();
        JSONArray resultCols = new JSONArray();
        if (input == null || input.size() == 0) {
            logger.warn("input is empty");
            return;
        }
        this.checkBoxSelectFilter(jsonObject, "number", FEATURE_COLS);
        this.supplementForCheckbox(jsonObject, TPL_FILENAME, 4, vo);
        if (input != null && input.size() > 0) {
            if (!jsonObject.containsKey("feature_cols")) {
                logger.warn("feature_cols not exists!!!");
                return;
            }
            List<String> featuresTmp = jsonObject.getJSONArray("feature_cols")
                    .toJavaList(String.class);
            List<String> features = new ArrayList<>();
            for (String f : featuresTmp) {
                if (f.contains(".")) {
                    String[] tmps = f.split("\\.");
                    features.add(tmps[tmps.length - 1]);
                }
            }
            String sourceTable = input.getJSONObject(0).getString("tableName");
            List<String> inputCols = input.getJSONObject(0).getJSONArray("tableCols")
                    .toJavaList(String.class);
            List<String> inputColumnTypes = input.getJSONObject(0).getJSONArray("columnTypes")
                    .toJavaList(String.class);
            jsonObject.put("source_table", sourceTable);
            resultCols.addAll(inputCols);
            resultOutputTypes.addAll(inputColumnTypes);
        }
        JSONArray output = new JSONArray();
        String[] cols = new String[]{"cluster_variance", "num_iterations", "objective_fn",
                "centroids", "frac_reassigned"};
        String[] types = new String[]{"text", "integer", "double precision", "text",
                "double precision"};
        JSONObject modelItem = new JSONObject();
        JSONArray modelCols = new JSONArray(Arrays.asList(cols));
        JSONArray modelColumnTypes = new JSONArray(Arrays.asList(types));
        modelItem.put("tableName", modelTable);
        modelItem.put("nodeName", vo.getName() == null ? AlgEnum.KMEANS.toString() : vo.getName());
        modelItem.put("tableCols", modelCols);
        modelItem.put("columnTypes", modelColumnTypes);

        resultCols.add("cluster_id");
        resultOutputTypes.add("text");
        JSONObject resultItem = new JSONObject();
        resultItem.put("tableName", tableName);
        resultItem.put("tableCols", resultCols);
        resultItem.put("nodeName", vo.getName() == null ? AlgEnum.KMEANS.toString() : vo.getName());
        resultItem.put("columnTypes", resultOutputTypes);
        this.setSubTypeForOutput(resultItem);
        output.add(resultItem);

        jsonObject.put("output", output);
        vo.setData(jsonObject);
    }
}
